{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "google",
   "metadata": {},
   "source": [
    "##### Copyright 2025 Google LLC."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "apache",
   "metadata": {},
   "source": [
    "Licensed under the Apache License, Version 2.0 (the \"License\");\n",
    "you may not use this file except in compliance with the License.\n",
    "You may obtain a copy of the License at\n",
    "\n",
    "    http://www.apache.org/licenses/LICENSE-2.0\n",
    "\n",
    "Unless required by applicable law or agreed to in writing, software\n",
    "distributed under the License is distributed on an \"AS IS\" BASIS,\n",
    "WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
    "See the License for the specific language governing permissions and\n",
    "limitations under the License.\n"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "basename",
   "metadata": {},
   "source": [
    "# line_balancing_sat"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "link",
   "metadata": {},
   "source": [
    "<table align=\"left\">\n",
    "<td>\n",
    "<a href=\"https://colab.research.google.com/github/google/or-tools/blob/main/examples/notebook/examples/line_balancing_sat.ipynb\"><img src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/colab_32px.png\"/>Run in Google Colab</a>\n",
    "</td>\n",
    "<td>\n",
    "<a href=\"https://github.com/google/or-tools/blob/main/examples/python/line_balancing_sat.py\"><img src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/github_32px.png\"/>View source on GitHub</a>\n",
    "</td>\n",
    "</table>"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "doc",
   "metadata": {},
   "source": [
    "First, you must install [ortools](https://pypi.org/project/ortools/) package in this colab."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "install",
   "metadata": {},
   "outputs": [],
   "source": [
    "%pip install ortools"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "description",
   "metadata": {},
   "source": [
    "\n",
    "Reader and solver of the single assembly line balancing problem.\n",
    "\n",
    "from https://assembly-line-balancing.de/salbp/:\n",
    "\n",
    "The simple assembly line balancing problem (SALBP) is the basic optimization\n",
    "problem in assembly line balancing research. Given is a set of tasks each of\n",
    "which has a deterministic task time. The tasks are partially ordered by\n",
    "precedence relations defining a precedence graph as depicted below.\n",
    "\n",
    "It reads .alb files:\n",
    "    https://assembly-line-balancing.de/wp-content/uploads/2017/01/format-ALB.pdf\n",
    "\n",
    "and solves the corresponding problem.\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "code",
   "metadata": {},
   "outputs": [],
   "source": [
    "import collections\n",
    "import re\n",
    "from typing import Dict, Sequence\n",
    "\n",
    "from ortools.sat.colab import flags\n",
    "from google.protobuf import text_format\n",
    "\n",
    "from ortools.sat.python import cp_model\n",
    "\n",
    "_INPUT = flags.define_string(\"input\", \"\", \"Input file to parse and solve.\")\n",
    "_PARAMS = flags.define_string(\"params\", \"\", \"Sat solver parameters.\")\n",
    "_OUTPUT_PROTO = flags.define_string(\n",
    "    \"output_proto\", \"\", \"Output file to write the cp_model proto to.\"\n",
    ")\n",
    "_MODEL = flags.define_string(\n",
    "    \"model\", \"boolean\", \"Model used: boolean, scheduling, greedy\"\n",
    ")\n",
    "\n",
    "\n",
    "class SectionInfo:\n",
    "    \"\"\"Store problem information for each section of the input file.\"\"\"\n",
    "\n",
    "    def __init__(self):\n",
    "        self.value = None\n",
    "        self.index_map = {}\n",
    "        self.set_of_pairs = set()\n",
    "\n",
    "    def __str__(self):\n",
    "        if self.index_map:\n",
    "            return f\"SectionInfo(index_map={self.index_map})\"\n",
    "        elif self.set_of_pairs:\n",
    "            return f\"SectionInfo(set_of_pairs={self.set_of_pairs})\"\n",
    "        elif self.value is not None:\n",
    "            return f\"SectionInfo(value={self.value})\"\n",
    "        else:\n",
    "            return \"SectionInfo()\"\n",
    "\n",
    "\n",
    "def read_problem(filename: str) -> Dict[str, SectionInfo]:\n",
    "    \"\"\"Reads a .alb file and returns the problem.\"\"\"\n",
    "\n",
    "    current_info = SectionInfo()\n",
    "\n",
    "    problem: Dict[str, SectionInfo] = {}\n",
    "    with open(filename, \"r\") as input_file:\n",
    "        print(f\"Reading problem from '{filename}'\")\n",
    "\n",
    "        for line in input_file:\n",
    "            stripped_line = line.strip()\n",
    "            if not stripped_line:\n",
    "                continue\n",
    "\n",
    "            match_section_def = re.fullmatch(r\"<([\\w\\s]+)>\", stripped_line)\n",
    "            if match_section_def:\n",
    "                section_name = match_section_def.group(1)\n",
    "                if section_name == \"end\":\n",
    "                    continue\n",
    "\n",
    "                current_info = SectionInfo()\n",
    "                problem[section_name] = current_info\n",
    "                continue\n",
    "\n",
    "            match_single_number = re.fullmatch(r\"^([0-9]+)$\", stripped_line)\n",
    "            if match_single_number:\n",
    "                current_info.value = int(match_single_number.group(1))\n",
    "                continue\n",
    "\n",
    "            match_key_value = re.fullmatch(r\"^([0-9]+)\\s+([0-9]+)$\", stripped_line)\n",
    "            if match_key_value:\n",
    "                key = int(match_key_value.group(1))\n",
    "                value = int(match_key_value.group(2))\n",
    "                current_info.index_map[key] = value\n",
    "                continue\n",
    "\n",
    "            match_pair = re.fullmatch(r\"^([0-9]+),([0-9]+)$\", stripped_line)\n",
    "            if match_pair:\n",
    "                left = int(match_pair.group(1))\n",
    "                right = int(match_pair.group(2))\n",
    "                current_info.set_of_pairs.add((left, right))\n",
    "                continue\n",
    "\n",
    "            print(f\"Unrecognized line '{stripped_line}'\")\n",
    "\n",
    "    return problem\n",
    "\n",
    "\n",
    "def print_stats(problem: Dict[str, SectionInfo]) -> None:\n",
    "    print(\"Problem Statistics\")\n",
    "    for key, value in problem.items():\n",
    "        print(f\"  - {key}: {value}\")\n",
    "\n",
    "\n",
    "def solve_problem_greedily(problem: Dict[str, SectionInfo]) -> Dict[int, int]:\n",
    "    \"\"\"Compute a greedy solution.\"\"\"\n",
    "    print(\"Solving using a Greedy heuristics\")\n",
    "\n",
    "    num_tasks = problem[\"number of tasks\"].value\n",
    "    if num_tasks is None:\n",
    "        return {}\n",
    "    all_tasks = range(1, num_tasks + 1)  # Tasks are 1 based in the data.\n",
    "    precedences = problem[\"precedence relations\"].set_of_pairs\n",
    "    durations = problem[\"task times\"].index_map\n",
    "    cycle_time = problem[\"cycle time\"].value\n",
    "\n",
    "    weights = collections.defaultdict(int)\n",
    "    successors = collections.defaultdict(list)\n",
    "\n",
    "    candidates = set(all_tasks)\n",
    "\n",
    "    for before, after in precedences:\n",
    "        weights[after] += 1\n",
    "        successors[before].append(after)\n",
    "        if after in candidates:\n",
    "            candidates.remove(after)\n",
    "\n",
    "    assignment: Dict[int, int] = {}\n",
    "    current_pod = 0\n",
    "    residual_capacity = cycle_time\n",
    "\n",
    "    while len(assignment) < num_tasks:\n",
    "        if not candidates:\n",
    "            print(\"error empty\")\n",
    "            break\n",
    "\n",
    "        best = -1\n",
    "        best_slack = cycle_time\n",
    "        best_duration = 0\n",
    "\n",
    "        for c in candidates:\n",
    "            duration = durations[c]\n",
    "            slack = residual_capacity - duration\n",
    "            if slack < best_slack and slack >= 0:\n",
    "                best_slack = slack\n",
    "                best = c\n",
    "                best_duration = duration\n",
    "\n",
    "        if best == -1:\n",
    "            current_pod += 1\n",
    "            residual_capacity = cycle_time\n",
    "            continue\n",
    "\n",
    "        candidates.remove(best)\n",
    "        assignment[best] = current_pod\n",
    "        residual_capacity -= best_duration\n",
    "\n",
    "        for succ in successors[best]:\n",
    "            weights[succ] -= 1\n",
    "            if weights[succ] == 0:\n",
    "                candidates.add(succ)\n",
    "                del weights[succ]\n",
    "\n",
    "    print(f\"  greedy solution uses {current_pod + 1} pods.\")\n",
    "\n",
    "    return assignment\n",
    "\n",
    "\n",
    "def solve_problem_with_boolean_model(\n",
    "    problem: Dict[str, SectionInfo], hint: Dict[int, int]\n",
    ") -> None:\n",
    "    \"\"\"solve the given problem.\"\"\"\n",
    "\n",
    "    print(\"Solving using the Boolean model\")\n",
    "    # problem data\n",
    "    num_tasks = problem[\"number of tasks\"].value\n",
    "    if num_tasks is None:\n",
    "        return\n",
    "    all_tasks = range(1, num_tasks + 1)  # Tasks are 1 based in the problem.\n",
    "    durations = problem[\"task times\"].index_map\n",
    "    precedences = problem[\"precedence relations\"].set_of_pairs\n",
    "    cycle_time = problem[\"cycle time\"].value\n",
    "\n",
    "    num_pods = max(p for _, p in hint.items()) + 1 if hint else num_tasks - 1\n",
    "    all_pods = range(num_pods)\n",
    "\n",
    "    model = cp_model.CpModel()\n",
    "\n",
    "    # assign[t, p] indicates if task t is done on pod p.\n",
    "    assign = {}\n",
    "    # possible[t, p] indicates if task t is possible on pod p.\n",
    "    possible = {}\n",
    "\n",
    "    # Create the variables\n",
    "    for t in all_tasks:\n",
    "        for p in all_pods:\n",
    "            assign[t, p] = model.new_bool_var(f\"assign_{t}_{p}\")\n",
    "            possible[t, p] = model.new_bool_var(f\"possible_{t}_{p}\")\n",
    "\n",
    "    # active[p] indicates if pod p is active.\n",
    "    active = [model.new_bool_var(f\"active_{p}\") for p in all_pods]\n",
    "\n",
    "    # Each task is done on exactly one pod.\n",
    "    for t in all_tasks:\n",
    "        model.add_exactly_one([assign[t, p] for p in all_pods])\n",
    "\n",
    "    # Total tasks assigned to one pod cannot exceed cycle time.\n",
    "    for p in all_pods:\n",
    "        model.add(sum(assign[t, p] * durations[t] for t in all_tasks) <= cycle_time)\n",
    "\n",
    "    # Maintain the possible variables:\n",
    "    #   possible at pod p -> possible at any pod after p\n",
    "    for t in all_tasks:\n",
    "        for p in range(num_pods - 1):\n",
    "            model.add_implication(possible[t, p], possible[t, p + 1])\n",
    "\n",
    "    # Link possible and active variables.\n",
    "    for t in all_tasks:\n",
    "        for p in all_pods:\n",
    "            model.add_implication(assign[t, p], possible[t, p])\n",
    "            if p > 1:\n",
    "                model.add_implication(assign[t, p], ~possible[t, p - 1])\n",
    "\n",
    "    # Precedences.\n",
    "    for before, after in precedences:\n",
    "        for p in range(1, num_pods):\n",
    "            model.add_implication(assign[before, p], ~possible[after, p - 1])\n",
    "\n",
    "    # Link active variables with the assign one.\n",
    "    for p in all_pods:\n",
    "        all_assign_vars = [assign[t, p] for t in all_tasks]\n",
    "        for a in all_assign_vars:\n",
    "            model.add_implication(a, active[p])\n",
    "        model.add_bool_or(all_assign_vars + [~active[p]])\n",
    "\n",
    "    # Force pods to be contiguous. This is critical to get good lower bounds\n",
    "    # on the objective, even if it makes feasibility harder.\n",
    "    for p in range(1, num_pods):\n",
    "        model.add_implication(~active[p - 1], ~active[p])\n",
    "        for t in all_tasks:\n",
    "            model.add_implication(~active[p], possible[t, p - 1])\n",
    "\n",
    "    # Objective.\n",
    "    model.minimize(sum(active))\n",
    "\n",
    "    # add search hinting from the greedy solution.\n",
    "    for t in all_tasks:\n",
    "        model.add_hint(assign[t, hint[t]], 1)\n",
    "\n",
    "    if _OUTPUT_PROTO.value:\n",
    "        print(f\"Writing proto to {_OUTPUT_PROTO.value}\")\n",
    "        model.export_to_file(_OUTPUT_PROTO.value)\n",
    "\n",
    "    # solve model.\n",
    "    solver = cp_model.CpSolver()\n",
    "    if _PARAMS.value:\n",
    "        text_format.Parse(_PARAMS.value, solver.parameters)\n",
    "    solver.parameters.log_search_progress = True\n",
    "    solver.solve(model)\n",
    "\n",
    "\n",
    "def solve_problem_with_scheduling_model(\n",
    "    problem: Dict[str, SectionInfo], hint: Dict[int, int]\n",
    ") -> None:\n",
    "    \"\"\"solve the given problem using a cumulative model.\"\"\"\n",
    "\n",
    "    print(\"Solving using the scheduling model\")\n",
    "    # Problem data\n",
    "    num_tasks = problem[\"number of tasks\"].value\n",
    "    if num_tasks is None:\n",
    "        return\n",
    "    all_tasks = range(1, num_tasks + 1)  # Tasks are 1 based in the data.\n",
    "    durations = problem[\"task times\"].index_map\n",
    "    precedences = problem[\"precedence relations\"].set_of_pairs\n",
    "    cycle_time = problem[\"cycle time\"].value\n",
    "\n",
    "    num_pods = max(p for _, p in hint.items()) + 1 if hint else num_tasks\n",
    "\n",
    "    model = cp_model.CpModel()\n",
    "\n",
    "    # pod[t] indicates on which pod the task is performed.\n",
    "    pods = {}\n",
    "    for t in all_tasks:\n",
    "        pods[t] = model.new_int_var(0, num_pods - 1, f\"pod_{t}\")\n",
    "\n",
    "    # Create the variables\n",
    "    intervals = []\n",
    "    demands = []\n",
    "    for t in all_tasks:\n",
    "        interval = model.new_fixed_size_interval_var(pods[t], 1, \"\")\n",
    "        intervals.append(interval)\n",
    "        demands.append(durations[t])\n",
    "\n",
    "    # add terminating interval as the objective.\n",
    "    obj_var = model.new_int_var(1, num_pods, \"obj_var\")\n",
    "    obj_size = model.new_int_var(1, num_pods, \"obj_duration\")\n",
    "    obj_interval = model.new_interval_var(\n",
    "        obj_var, obj_size, num_pods + 1, \"obj_interval\"\n",
    "    )\n",
    "    intervals.append(obj_interval)\n",
    "    demands.append(cycle_time)\n",
    "\n",
    "    # Cumulative constraint.\n",
    "    model.add_cumulative(intervals, demands, cycle_time)\n",
    "\n",
    "    # Precedences.\n",
    "    for before, after in precedences:\n",
    "        model.add(pods[after] >= pods[before])\n",
    "\n",
    "    # Objective.\n",
    "    model.minimize(obj_var)\n",
    "\n",
    "    # add search hinting from the greedy solution.\n",
    "    for t in all_tasks:\n",
    "        model.add_hint(pods[t], hint[t])\n",
    "\n",
    "    if _OUTPUT_PROTO.value:\n",
    "        print(f\"Writing proto to{_OUTPUT_PROTO.value}\")\n",
    "        model.export_to_file(_OUTPUT_PROTO.value)\n",
    "\n",
    "    # solve model.\n",
    "    solver = cp_model.CpSolver()\n",
    "    if _PARAMS.value:\n",
    "        text_format.Parse(_PARAMS.value, solver.parameters)\n",
    "    solver.parameters.log_search_progress = True\n",
    "    solver.solve(model)\n",
    "\n",
    "\n",
    "def main(argv: Sequence[str]) -> None:\n",
    "    if len(argv) > 1:\n",
    "        raise app.UsageError(\"Too many command-line arguments.\")\n",
    "\n",
    "    problem = read_problem(_INPUT.value)\n",
    "    print_stats(problem)\n",
    "    greedy_solution = solve_problem_greedily(problem)\n",
    "\n",
    "    if _MODEL.value == \"boolean\":\n",
    "        solve_problem_with_boolean_model(problem, greedy_solution)\n",
    "    elif _MODEL.value == \"scheduling\":\n",
    "        solve_problem_with_scheduling_model(problem, greedy_solution)\n",
    "\n",
    "\n",
    "main()\n",
    "\n"
   ]
  }
 ],
 "metadata": {
  "language_info": {
   "name": "python"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
